{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query Data with AWS Data Wrangler"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**AWS Data Wrangler** is an open-source Python library that extends the power of the Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, Amazon QuickSight, etc).\n",
    "\n",
    "* https://github.com/awslabs/aws-data-wrangler\n",
    "* https://aws-data-wrangler.readthedocs.io\n",
    "\n",
    "Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.\n",
    "\n",
    "_Note that AWS Data Wrangler is simply a Python library that uses existing AWS Services.  AWS Data Wrangler is not a separate AWS Service.  You install AWS Data Wrangler through `pip install` as we will see next._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Pre-Requisite: Make Sure You Created an Athena Table for Both TSV and Parquet in Previous Notebooks_"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r ingest_create_athena_table_tsv_passed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    ingest_create_athena_table_tsv_passed\n",
    "except NameError:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(ingest_create_athena_table_tsv_passed)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not ingest_create_athena_table_tsv_passed:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "else:\n",
    "    print(\"[OK]\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r ingest_create_athena_table_parquet_passed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    ingest_create_athena_table_parquet_passed\n",
    "except NameError:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(ingest_create_athena_table_parquet_passed)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not ingest_create_athena_table_parquet_passed:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "else:\n",
    "    print(\"[OK]\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import boto3\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name=\"sagemaker\", region_name=region)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import awswrangler as wr"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query Parquet from S3 with Push-Down Filters\n",
    "\n",
    "Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.\n",
    "\n",
    "The concept of Dataset goes beyond the simple idea of files and enable more complex features like partitioning and catalog integration (AWS Glue Catalog): \n",
    "\n",
    "_dataset (bool)_ – If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "p_filter = lambda x: x[\"product_category\"] == \"Digital_Software\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "path = \"s3://{}/amazon-reviews-pds/parquet/\".format(bucket)\n",
    "df_parquet_results = wr.s3.read_parquet(\n",
    "    path, columns=[\"star_rating\", \"product_category\", \"review_body\"], partition_filter=p_filter, dataset=True\n",
    ")\n",
    "df_parquet_results.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_parquet_results.head(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query Parquet from S3 in Chunks"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Batching (chunked argument) (Memory Friendly):\n",
    "\n",
    "Will enable the function to return a Iterable of DataFrames instead of a regular DataFrame.\n",
    "\n",
    "There are two batching strategies on Wrangler:\n",
    "* If chunked=True, a new DataFrame will be returned for each file in your path/dataset.\n",
    "* If chunked=INTEGER, Wrangler will iterate on the data by number of rows equal to the received INTEGER.\n",
    "\n",
    "P.S. chunked=True if faster and uses less memory while chunked=INTEGER is more precise in number of rows for each Dataframe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "path = \"s3://{}/amazon-reviews-pds/parquet/\".format(bucket)\n",
    "chunk_iter = wr.s3.read_parquet(\n",
    "    path,\n",
    "    columns=[\"star_rating\", \"product_category\", \"review_body\"],\n",
    "    # filters=[(\"product_category\", \"=\", \"Digital_Software\")],\n",
    "    partition_filter=p_filter,\n",
    "    dataset=True,\n",
    "    chunked=True,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "print(next(chunk_iter))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query the Glue Catalog (ie. Hive Metastore)\n",
    "Get an iterator of tables."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "database_name = \"dsoaws\"\n",
    "table_name_tsv = \"amazon_reviews_tsv\"\n",
    "table_name_parquet = \"amazon_reviews_parquet\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for table in wr.catalog.get_tables(database=\"dsoaws\"):\n",
    "    print(table[\"Name\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query from Athena\n",
    "Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.  \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "df = wr.athena.read_sql_query(sql=\"SELECT * FROM {} LIMIT 5000\".format(table_name_parquet), database=database_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Query from Athena in Chunks\n",
    "Retrieving in chunks can help reduce memory requirements.  \n",
    "\n",
    "_This will take a few seconds._"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "chunk_iter = wr.athena.read_sql_query(\n",
    "    sql=\"SELECT * FROM {} LIMIT 5000\".format(table_name_parquet),\n",
    "    database=\"{}\".format(database_name),\n",
    "    chunksize=64_000,  # 64 KB Chunks\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(next(chunk_iter))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
